PostgreSQL 17 增量备份工作实践

在 PostgreSQL 16 及之前的版本中,数据库备份主要依赖于全量备份(Full Backup)+ WAL 日志的方案。这种传统备份模式虽然可靠,但存在诸多不便:

存储效率低下

  • 每次备份都需要复制整个数据库
  • 对于大型数据库,单次备份可能需要数小时
  • 备份文件体积巨大,存储成本高昂

性能开销显著

  • 全量备份期间会显著增加 I/O 负载
  • 长时间备份可能影响数据库的整体性能
  • 在高并发场景下,备份会造成明显的性能抖动

恢复时间长

  • 从全量备份恢复需要大量时间
  • 大型数据库的恢复可能需要数小时甚至数天
  • 业务中断时间较长

不过,以上“不便之处”是从网上 copy 的。在我工作实践中,全量备份 + WAL 日志备份方案的主要问题是:

  1. 大量 WAL 日志备份管理起来不方便,都是一些小文件,很难判断中间是否漏掉了某个 WAL 日志。
  2. 基于第一点,我需要更加频繁的全量备份,这会导致存储的膨胀。

引入 PostgreSQL 17 增量备份

PostgreSQL 17 直接官方引入了增量备份特性,从根本上解决了传统备份方案的诸多痛点。

但是我依然存在一个疑惑,增量备份虽然高效,但并非完美。它只备份了两次备份点之间发生变化的数据块,这意味着存在时间间隙风险:

如果最后一次增量备份已经完成,而下次备份时间还未到,在这个时间窗口发生故障,可能导致数据丢失。如何保持对这部分时间的数据变更的捕获呢?

所以最终还是要回到 WAL 日志上来。但是我可以只保留最后一次增量备份时间节点之后的 WAL 日志。

通过这个思路,我设计了一个全新的备份脚本,详细的脚本将会附在文章最后,这里先简单描述一下我的思路:

  1. 首先我设定了备份文件夹的名称规则 {YYYY}-{mm}-{dd}-{HHMMSS}_{TYPE}_{INDEX} ,实际备份文件夹名称类似 2025-01-23-050002_FULL_02025-01-24-223002_INCR_17 ,这样能够清晰的判断备份的时间,每轮备份的次数,备份的类型,还可以通过文件夹名称进行排序。
  2. 我在每次备份前,列出当前的所有备份,通过文件夹名称进行排序,拿到最新的一次备份,根据它来判断本次备份应该是做全量备份还是增量备份,以及获得一个新的索引号,此时就可以创建一个新的备份目录。
  3. 调用 pg_basebackup 来做备份。
  4. 备份完成后,检查一下备份的三个文件 base.tar pg_wal.tar backup_manifest 是否存在
  5. 然后开始清理 WAL 归档。pg 的 WAL 归档很有特点,在每次用 pg_basebackup 执行完备份后(无论是全量备份还是增量备份),都会生产一个 xxxxxxxxx_xxxxxxxxx.backup 的 WAL。所以,我对 WAL 归档目录中所有 .backup 后缀名文件进行排序,以第二个 .backup 文件为基准,删除在此之前产生的 WAL 。因为我们已经执行了 pg_basebackup ,这些 WAL 归档不再需要保留了。
  6. 最后清理不需要的备份,列出备份目录中所有包含 FULL 字符串的子文件夹,然后排序,看看需要保留到哪一个全量备份,然后删除在这个全量备份之前的所有子文件夹。
  7. 需要检查一下本轮备份的每个文件夹是不是都在,然后计算一下当前备份占用的数据量。
  8. 结束之后,用钉钉 webhook 发个通知~

具体实施步骤

1. 修改 postgresql.conf 使其支持 WAL 归档和增量备份

postgresql.conf 的以下项目进行修改

# 需要修改 wal 的级别
wal_level = replica

# 开启 WAL 归档
archive_mode = on
# 设置 WAL 归档目录
archive_command = 'copy %p D:\\********\\Achieve\\%f'

# 开启walsummarizer进程来记录WAL摘要信息,增量备份依赖这个参数
summarize_wal = on
# 会根据时间周期自动清理 WAL 摘要信息文件,需要保证一轮全量备份+增量备份保持在这个周期内。
wal_summary_keep_time = '10d'

然后重启 postgresql 服务

2. 创建 .env 文件

在脚本目录创建一个 .env 用来存储一些配置项:

# 备份父目录,用来存储备份的子文件夹
BACKUP_DIR = "D:/********/Backup"

# archive_command 中设定的 WAL 归档目录
ARCHIVE_DIR = "D:/********/Achieve"

# 日志文件位置
LOG_FILE = "D:/********/log.txt"

# pg_basebackup 执行文件的路径
PG_BASEBACKUP_PATH = "C:/Program Files/PostgreSQL/17/bin/pg_basebackup.exe"

# 数据库连接用户
PGUSER = "postgres"
# 数据库连接密码
PGPASSWORD = "************"

# 最大增量备份数,比如设定为 6 则每轮备份会有 1 次全量备份和 6 次增量备份,
# 如果每天运行一次脚本,那么完整的备份周期是 7 天一轮,每轮 7 个文件
MAX_INCREMENT = "6"

# 最大备份轮数,设为 2 时,如果超过两轮备份,更老的备份会被删除
MAX_ROLL = "2"

# 钉钉 webhook url
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=******"
# 钉钉 webhook 通知需要 at 的人的电话号码
DINGTALK_AT_MOBILE = "137******87"

3. 创建 backup.py 脚本

在脚本目录创建脚本:

import logging  
import os  
import shutil  
import subprocess  
import time  
from datetime import datetime  
from logging.handlers import RotatingFileHandler  
from pathlib import Path  
from typing import Literal  
  
import httpx  
from dotenv import load_dotenv  
  
  
def init_logging(log_file):  
    """  
    设置日志记录。  
    """    file_handler = RotatingFileHandler(log_file, maxBytes=200 * 1024, backupCount=3, encoding="utf-8")  
    console_handler = logging.StreamHandler()  
    logging.basicConfig(  
        level=logging.INFO,  
        format='%(asctime)s - %(levelname)s - %(message)s',  
        handlers=[file_handler, console_handler]  
    )  
  
  
def perform_incr_backup(pg_basebackup_path:Path, pguser, backup_dir:Path, last_backup_dir:Path|None, backup_type:Literal['FULL', 'INCR']):  
    if last_backup_dir.exists() and backup_type == 'INCR':  
        cmd = [str(pg_basebackup_path), '-Ft', '-D', str(backup_dir), '-v', '-i', str(last_backup_dir / 'backup_manifest'), '-U', pguser]  
    else:  
        cmd = [str(pg_basebackup_path), '-Ft', '-D', str(backup_dir), '-P', '-U', pguser]  
  
    try:  
        _r = subprocess.run(cmd, env=os.environ.copy(), capture_output=True, text=True, check=True)  
        return True, _r  
    except subprocess.CalledProcessError as _e:  
        return False, _e  
  
  
def check_success(backup_dir:Path):  
    if (  
        backup_dir.is_dir() and  
        (backup_dir / 'base.tar').is_file() and  
        (backup_dir / 'pg_wal.tar').is_file() and  
        (backup_dir / 'backup_manifest').is_file()  
    ):  
        return True  
    else:  
        return False  
  
  
def get_latest_backup(backup_dir:Path, max_increment):  
    _sub_dirs = [item for item in backup_dir.iterdir() if item.is_dir()]  
  
    # 获取上一次备份的索引  
    if len(_sub_dirs) == 0:  
        _latest_dir = None  
        _latest_index = None  
    else:  
        _latest_dir = sorted(_sub_dirs, key=lambda x: x.name)[-1]  
        _latest_index = int(_latest_dir.name.split("_")[-1])  
  
    #  上一次的索引小于最大增量备份数时,进行增量备份,否则进行全量备份  
    if _latest_index is not None and _latest_index < max_increment:  
        backup_type:Literal['FULL', 'INCR'] = "INCR"  
        index = _latest_index + 1  
    else:  
        backup_type:Literal['FULL', 'INCR'] = "FULL"  
        index = 0  
  
    return _latest_dir, index, backup_type  
  
  
def clean_wal(wal_archive_dir:Path):  
    #  获取 WAL 归档目录中,后缀名为 .backup 的文件  
    wal_files = [f for f in wal_archive_dir.glob('*.backup') if f.is_file()]  
  
    if len(wal_files) >= 1:  
        sorted_backup_files = sorted(wal_files, key=os.path.getctime, reverse=True)  
        reference_wal_file = sorted_backup_files[1]  
        reference_wal_time = os.path.getctime(reference_wal_file)  
        reference_wal_time_desc = datetime.fromtimestamp(reference_wal_time).strftime('%Y-%m-%d %H:%M:%S')  
  
        logging.info(f"       计划清除 {reference_wal_file}{reference_wal_time_desc}) 之前的 WAL 文件")  
  
        all_wal_files = [f for f in wal_archive_dir.iterdir() if f.is_file() and os.path.getctime(f) < reference_wal_time]  
  
        del_count, err_count, all_count = 0, 0, len(all_wal_files)  
        for file in all_wal_files:  
            try:  
                os.remove(file)  
                del_count += 1  
                logging.debug(f"       已删除归档的 WAL 文件: {file}")  
            except Exception as _e:  
                logging.error(f"       在删除归档 WAL 文件: {file} 时出错: {_e}")  
                err_count += 1  
        logging.info(f"       共计 {del_count}/{all_count} 个 WAL 文件归档清理完毕,错误 {err_count} 个。")  
        return del_count, err_count, all_count  
    else:  
        logging.info(f"       无需清理归档的 WAL 文件")  
        return 0, 0, 0  
  
  
def clean_backup(max_roll:int,  backup_dir:Path):  
    full_backups = [  
        folder for folder in backup_dir.iterdir()  
        if folder.is_dir() and 'FULL' in folder.name  
    ]  
    full_backups.sort(key=lambda x: x.name)  
    logging.info(f"       计划保留 {max_roll} 轮备份,当前存在 {len(full_backups)} 轮备份")  
  
    if len(full_backups) > max_roll:  
        cutoff_backup = full_backups[- max_roll]  
        backups_to_remove = [  
            folder for folder in backup_dir.iterdir()  
            if folder.is_dir() and  
               folder.stat().st_ctime < cutoff_backup.stat().st_ctime  
        ]  
        logging.info(f"       正在清除 {cutoff_backup} 之前创建的所有备份")  
  
        del_count, err_count, all_count = 0, 0, len(backups_to_remove)  
        for backup in backups_to_remove:  
            try:  
                shutil.rmtree(backup)  
                logging.debug(f"       已删除备份 {backup}")  
                del_count += 1  
            except Exception as _e:  
                logging.error(f"       在删除备份 {backup} 时出错: {_e}")  
                err_count += 1  
  
        logging.info(f"       共计 {del_count}/{all_count} 个备份清理完毕,错误 {err_count} 个。")  
        return del_count, err_count, all_count  
    else:  
        logging.info(f"       无需清理备份文件")  
        return 0, 0, 0  
  
  
def dingtalk_alert(webhook_url:str, at_mobiles:str, title:str, text:str):  
    httpx.post(webhook_url, json={  
        "msgtype": "markdown",  
        "markdown": {"title":title, "text": text},  
        "at": {"atMobiles": [at_mobiles], "isAtAll": False}  
    })  
  
  
def count_backup(backup_dir:Path, now_index:int):  
    full_backups = [  
        folder for folder in backup_dir.iterdir()  
        if folder.is_dir() and 'FULL' in folder.name  
    ]  
    full_backups.sort(key=lambda x: x.stat().st_ctime)  
    this_roll_full_backup = full_backups[-1]  
    later_backups = [  
        folder for folder in backup_dir.iterdir()  
        if folder.is_dir() and folder.stat().st_ctime > this_roll_full_backup.stat().st_ctime  
    ]  
    later_backups_count = len(later_backups)  
    check = (later_backups_count == now_index and all('INCR' in folder.name for folder in later_backups))  
  
    incr_backups = [  
        folder for folder in backup_dir.iterdir()  
        if folder.is_dir() and 'INCR' in folder.name  
    ]  
  
    files = [f for f in backup_dir.rglob('*') if f.is_file()]  
    total_size = sum(f.stat().st_size for f in files)  
    total_gb = round(total_size / (1024 * 1024 * 1024), 2)  
  
  
    return len(full_backups), len(incr_backups), total_gb, check  
  
  
if __name__ == "__main__":  
    #  加载 .env 文件  
    load_dotenv()  
  
    #  从环境变量读取配置  
    BACKUP_DIR = Path(os.getenv('BACKUP_DIR'))  
    ARCHIVE_DIR = Path(os.getenv('ARCHIVE_DIR'))  
    PG_BASEBACKUP_PATH = Path(os.getenv('PG_BASEBACKUP_PATH'))  
    PGUSER = os.getenv('PGUSER')  
    PGPASSWORD = os.getenv('PGPASSWORD')  
    LOG_FILE = os.getenv('LOG_FILE')  
    MAX_INCREMENT = int(os.getenv('MAX_INCREMENT'))  
    MAX_ROLL = int(os.getenv('MAX_ROLL'))  
  
    DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK')  
    DINGTALK_AT_MOBILE = os.getenv('DINGTALK_AT_MOBILE')  
  
    #  初始化日志记录器  
    init_logging(LOG_FILE)  
  
    logging.info(">>>>> BACKUP START")  
    t = time.time()  
  
    #  获取当前最新的备份  
  
    latest_backup, next_index, next_backup_type = get_latest_backup(BACKUP_DIR, MAX_INCREMENT)  
  
    logging.info(f"STEP-1 查询到上一次的备份是:{latest_backup}")  
    logging.info(f"STEP-2 正在准备进行 {next_backup_type} 备份,为该轮备份的第 {next_index}/{MAX_INCREMENT} 次备份。")  
  
    #  创建新备份的目录  
    new_backup:Path = BACKUP_DIR / datetime.now().strftime(f"%Y-%m-%d-%H%M%S_{next_backup_type}_{next_index}")  
    new_backup.mkdir(parents=True, exist_ok=True)  
  
    logging.info(f"       为新备份创建了目录:{new_backup}")  
  
    logging.info(f"       调用 {PG_BASEBACKUP_PATH} 进行备份中...")  
    logging.info(f"       使用 {PGUSER} 用户...")  
    is_success, result = perform_incr_backup(PG_BASEBACKUP_PATH, PGUSER, new_backup, Path(latest_backup), next_backup_type)  
    during_time = round((time.time() - t) / 60, 2)  
  
    #  如果备份成功  
    if is_success and check_success(new_backup):  
        logging.info(f"       备份成功: {new_backup}")  
  
        logging.info(f"STEP-3 准备清理 {ARCHIVE_DIR} 中归档的 WAL 文件")  
        wal_clean_del_count, wal_clean_err_count, wal_clean_all_count = clean_wal(ARCHIVE_DIR)  
  
        logging.info(f"STEP-4 准备清理 {BACKUP_DIR} 中的过期备份")  
        backup_clean_del_count, backup_clean_err_count, backup_clean_all_count = clean_backup(max_roll=MAX_ROLL, backup_dir=BACKUP_DIR)  
  
        alert = "🟢 数据库备份成功"  
    else:  
        shutil.rmtree(new_backup)  
        logging.error(f"      增量备份失败,已移除目录,失败原因: {result}")  
  
        wal_clean_del_count, wal_clean_err_count, wal_clean_all_count = 0, 0, 0  
        backup_clean_del_count, backup_clean_err_count, backup_clean_all_count = 0, 0, 0  
  
        alert = "🔴 数据库备份失败"  
  
    full_backups_count, incr_backups_count, backup_size, this_roll_check = count_backup(BACKUP_DIR, next_index)  
  
    alert_markdown = (  
            f"# {alert} \n "  
            f"### 本次备份 \n "  
            f"- 类型:{next_backup_type} \n "  
            f"- 轮次:{next_index}/{MAX_INCREMENT} \n "  
            f"- 文件:{new_backup} \n "  
            f"- 状态:{alert[-2:]} \n "  
            f"- 耗时:{during_time} 分钟 \n "  
            f"### 清理情况 \n "  
            f"- WAL清理:成功 {wal_clean_del_count}/{wal_clean_all_count} 错误 {wal_clean_err_count} \n "  
            f"- 备份清理:成功 {backup_clean_del_count}/{backup_clean_all_count} 错误 {backup_clean_err_count} \n "  
            f"### 备份状态 \n "  
            f"- 全量备份总数:{full_backups_count} \n "  
            f"- 增量备份总数:{incr_backups_count} \n "  
            f"- 备份数据总量:{backup_size}GB \n "  
            f"- 本轮备份完整性:{this_roll_check} \n "  
            f"--- \n "  
            f"{datetime.now().strftime('%Y/%m/%d %H:%M:%S')}"  
    )  
  
    dingtalk_alert(DINGTALK_WEBHOOK, DINGTALK_AT_MOBILE, alert[-7], alert_markdown)  
  
    logging.info(">>>>> BACKUP END\n\n")

4. 设定任务计划

因为我是用的 windows , 所以直接在任务计划程序中设定触发器,每天凌晨 4 点 15 分进行一次备份。

5. 测试脚本查阅日志

2025-01-25 13:47:28,363 - INFO - >>>>> BACKUP START
2025-01-25 13:47:28,364 - INFO - STEP-1 查询到上一次的备份是:D:\********\Backup\2025-01-25-133906_FULL_0
2025-01-25 13:47:28,364 - INFO - STEP-2 正在准备进行 FULL 备份,为该轮备份的第 0/24 次备份。
2025-01-25 13:47:28,364 - INFO -        为新备份创建了目录:D:\********\Backup\2025-01-25-134728_FULL_0
2025-01-25 13:47:28,364 - INFO -        调用 C:\Program Files\PostgreSQL\17\bin\pg_basebackup.exe 进行备份中...
2025-01-25 13:47:28,364 - INFO -        使用 postgres 用户...
2025-01-25 13:48:18,572 - INFO -        备份成功: D:\********\Backup\2025-01-25-134728_FULL_0
2025-01-25 13:48:18,572 - INFO - STEP-3 准备清理 D:\********\Achieve 中归档的 WAL 文件
2025-01-25 13:48:18,573 - INFO -        计划清除 D:\********\Achieve\000000010000006F000000A7.00000490.backup(2025-01-25 13:40:07) 之前的 WAL 文件
2025-01-25 13:48:18,573 - INFO -        共计 3/3 个 WAL 文件归档清理完毕,错误 0 个。
2025-01-25 13:48:18,573 - INFO - STEP-4 准备清理 D:\PostgreSQL_Backup\Backup 中的过期备份
2025-01-25 13:48:18,574 - INFO -        计划保留 2 轮备份,当前存在 2 轮备份
2025-01-25 13:48:18,574 - INFO -        无需清理备份文件
2025-01-25 13:48:19,541 - INFO - HTTP Request: POST https://oapi.dingtalk.com/robot/send?access_token=****** "HTTP/1.1 200 OK"
2025-01-25 13:48:19,544 - INFO - >>>>> BACKUP END

当然,也少不了通知推送:

image

脚本存在的不足和预期的改进

  1. 备份机制依然不够完善

    我目前是将其存储到了其他磁盘下,然后通过一个备份软件将其备份到云和专门的备份硬盘。虽然比较安全,但是增加了额外的软件依赖。

    我希望能够在一轮备份完成后,对整轮备份进行合并,然后归档到 s3 存储。

  2. 备份验证机制不完善

    目前只是验证了文件的存在,没有对其真实恢复能力进行自动化的检验。

    这一点,需要构建其他的工作流管道,比如从 s3 收到新的归档后,在云端创建服务器资源进行恢复验证。

  3. 尚未构建恢复脚本

    目前如果真的需要恢复,还是需要人工合并备份,到服务器上执行命令。尚未构建标准化的恢复流程。